Column Projection Pushdown: Q1 GROUP BY 16-17x faster#161
Conversation
- Add read_batch(col_indices) overload to ColumnarTable - Add set_required_columns() to VectorizedSeqScanOperator - Propagate required column indices from GROUP BY up to scan - Q1 GROUP BY improves 16-17x (161k -> 2.68M rows/s) - Gap vs DuckDB closes from 385x to 21x (10k) - All 89 tests pass
📝 WalkthroughWalkthroughThe PR adds column projection support to vectorized batch reading. Storage declares and implements a new ChangesVectorized Column Projection
🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
include/executor/vectorized_operator.hpp (1)
177-177:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUse consistent schema initialization in parallel path.
Line 177 unconditionally initializes
out_batchwithoutput_schema_, but line 147-148 conditionally usesreduced_schema_when projection is active. If column projection is enabled, this creates batches with mismatched schemas: parallel results havereduced_schema_, butout_batchhasoutput_schema_. While the copy loop (line 178) only iteratessrc.column_count()and avoids out-of-bounds issues, the extra columns inout_batchare wasteful.🔧 Proposed fix for consistency
if (parallel_idx_ < parallel_results_.size()) { auto& src = *parallel_results_[parallel_idx_]; - out_batch.init_from_schema(output_schema_); + out_batch.init_from_schema(required_col_indices_.empty() ? output_schema_ + : reduced_schema_); for (size_t c = 0; c < src.column_count(); ++c) {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@include/executor/vectorized_operator.hpp` at line 177, The unconditional call to out_batch.init_from_schema(output_schema_) causes schema mismatch when column projection is active (parallel path uses reduced_schema_); update the initialization to use reduced_schema_ when projection is enabled. Locate the code around out_batch.init_from_schema(...) and change it to initialize with reduced_schema_ if projection is active (same condition used at lines ~147-148), ensuring out_batch uses output_schema_ only when projection is not active so the parallel and non-parallel paths produce consistent batch schemas.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 99-102: set_required_columns currently updates
required_col_indices_ and reduced_schema_ but does not update output_schema_,
causing downstream operators to resolve indices against the full schema while
batches use reduced_schema_; update set_required_columns (the method named
set_required_columns) to also set output_schema_ = reduced_schema_ (or otherwise
replace the operator's output_schema_ with reduced_schema_) so output_schema()
matches the projected batch schema; ensure you modify the same function that
assigns required_col_indices_ and reduced_schema_ and keep reduced_schema_ and
required_col_indices_ semantics intact.
In `@src/executor/query_executor.cpp`:
- Around line 1724-1744: The code builds output_schema (GROUP BY keys +
aggregate result types) and passes it to base_scan->set_required_columns,
causing the scan to initialize output columns with aggregate result types and
crash when storage expects the original input types; instead build a
reduced_schema containing the actual input column types for every index in
required_col_indices (use current_root->output_schema().get_column(idx).type()
for group-by key indices and, for agg_infos entries with input_col_idx >=0, use
the table/input column type rather than the aggregate result type) and pass that
reduced_schema to base_scan->set_required_columns; update references around
output_schema, required_col_indices, agg_infos, and the call
base_scan->set_required_columns to use this corrected schema so
ColumnarTable::read_batch sees matching types.
In `@src/storage/columnar_table.cpp`:
- Around line 310-470: The duplicated per-column deserialization logic in
ColumnarTable::read_batch should be extracted into a private helper (suggested
signature: bool deserialize_column(size_t col_idx, uint64_t start_row, uint32_t
actual_rows, executor::ColumnVector& target_col)) that encapsulates opening
nulls/data files (using storage_manager_ and name_), seeking, reading nulls and
data, and appending values for all supported common::ValueType cases; update
both read_batch overloads to call deserialize_column for each col_idx and
propagate its bool result (leave out_batch.set_row_count/return handling in the
callers). Ensure the helper references schema_.get_column(col_idx).type() and
reuses the same deserialization branches (INT64, INT32/16/8,
FLOAT64/FLOAT32/DECIMAL, BOOL, TEXT) so all file IO and value conversion is
centralized.
---
Outside diff comments:
In `@include/executor/vectorized_operator.hpp`:
- Line 177: The unconditional call to out_batch.init_from_schema(output_schema_)
causes schema mismatch when column projection is active (parallel path uses
reduced_schema_); update the initialization to use reduced_schema_ when
projection is enabled. Locate the code around out_batch.init_from_schema(...)
and change it to initialize with reduced_schema_ if projection is active (same
condition used at lines ~147-148), ensuring out_batch uses output_schema_ only
when projection is not active so the parallel and non-parallel paths produce
consistent batch schemas.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 61f2d892-f533-4c37-ba0d-17316e8169e1
📒 Files selected for processing (4)
include/executor/vectorized_operator.hppinclude/storage/columnar_table.hppsrc/executor/query_executor.cppsrc/storage/columnar_table.cpp
| void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) { | ||
| required_col_indices_ = std::move(col_indices); | ||
| reduced_schema_ = std::move(reduced_schema); | ||
| } |
There was a problem hiding this comment.
output_schema_ must be updated when column projection is enabled.
When set_required_columns is called, the operator will produce batches with reduced_schema, but output_schema_ (returned by output_schema()) still reflects the full table schema (set at line 82). Downstream operators resolve column indices from child_->output_schema(), then access batch columns at those indices. If the indices refer to the full schema but the batch has a reduced schema, column accesses will retrieve wrong data or crash from out-of-bounds access.
For example:
- Table schema:
[id INT, cat TEXT, val INT] - Projection: columns
[cat, val]→ indices[1, 2]in full schema - Batch schema after projection:
[cat, val]→ positions[0, 1] - GROUP BY resolves "cat" → index 1 (from full schema)
batch.get_column(1)retrieves "val" (batch position 1) instead of "cat" → wrong result
🐛 Proposed fix
void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) {
required_col_indices_ = std::move(col_indices);
reduced_schema_ = std::move(reduced_schema);
+ output_schema_ = reduced_schema_; // Update output schema to match actual batch schema
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@include/executor/vectorized_operator.hpp` around lines 99 - 102,
set_required_columns currently updates required_col_indices_ and reduced_schema_
but does not update output_schema_, causing downstream operators to resolve
indices against the full schema while batches use reduced_schema_; update
set_required_columns (the method named set_required_columns) to also set
output_schema_ = reduced_schema_ (or otherwise replace the operator's
output_schema_ with reduced_schema_) so output_schema() matches the projected
batch schema; ensure you modify the same function that assigns
required_col_indices_ and reduced_schema_ and keep reduced_schema_ and
required_col_indices_ semantics intact.
| executor::Schema output_schema; | ||
| std::vector<size_t> required_col_indices; | ||
| for (const auto& gb : stmt.group_by()) { | ||
| const auto& gb_name = gb->to_string(); | ||
| size_t idx = current_root->output_schema().find_column(gb_name); | ||
| if (idx != static_cast<size_t>(-1)) { | ||
| output_schema.add_column(current_root->output_schema().get_column(idx).name(), | ||
| current_root->output_schema().get_column(idx).type(), | ||
| current_root->output_schema().get_column(idx).nullable()); | ||
| required_col_indices.push_back(idx); | ||
| } | ||
| } | ||
| for (size_t i = 0; i < agg_infos.size(); ++i) { | ||
| output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64, | ||
| false); | ||
| if (agg_infos[i].input_col_idx >= 0) { | ||
| required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx)); | ||
| } | ||
| } | ||
|
|
||
| base_scan->set_required_columns(required_col_indices, output_schema); |
There was a problem hiding this comment.
Wrong schema passed to set_required_columns — causes type mismatch and crashes.
The output_schema built at lines 1724-1742 is the GROUP BY operator's output schema (keys + aggregate results), not the scan operator's output schema (keys + aggregate inputs). When passed to set_required_columns at line 1744, this causes the scan to initialize out_batch with aggregate result types (e.g., FLOAT64 for SUM) instead of input types (e.g., INT for the SUM input column).
Then in ColumnarTable::read_batch, the code reads the table column type from the table schema (INT), attempts to dynamic_cast the out_batch column to the corresponding vector type (NumericVector<int64_t>), but the column was initialized as FLOAT64 (NumericVector<double>). The cast throws std::bad_cast, crashing the query.
Example:
- Table:
(cat TEXT, val INT) - Query:
SELECT cat, SUM(val) FROM test_table GROUP BY cat output_schemaat line 1744:(cat TEXT, agg_0 FLOAT64)— includes SUM output typerequired_col_indices:[0, 1]— cat and val- Scan creates batch with schema
(cat TEXT, agg_0 FLOAT64) - Storage tries to deserialize val (INT) into batch column 1 (FLOAT64 vector) → crash
🐛 Proposed fix
Build reduced_schema from the required input columns instead of the GROUP BY output schema:
executor::Schema output_schema;
std::vector<size_t> required_col_indices;
+ executor::Schema reduced_input_schema; // Schema of columns scanned from table
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
+ const auto& col = current_root->output_schema().get_column(idx);
+ reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
+ size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx);
+ const auto& col = current_root->output_schema().get_column(input_idx);
+ reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}
- base_scan->set_required_columns(required_col_indices, output_schema);
+ base_scan->set_required_columns(required_col_indices, reduced_input_schema);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| executor::Schema output_schema; | |
| std::vector<size_t> required_col_indices; | |
| for (const auto& gb : stmt.group_by()) { | |
| const auto& gb_name = gb->to_string(); | |
| size_t idx = current_root->output_schema().find_column(gb_name); | |
| if (idx != static_cast<size_t>(-1)) { | |
| output_schema.add_column(current_root->output_schema().get_column(idx).name(), | |
| current_root->output_schema().get_column(idx).type(), | |
| current_root->output_schema().get_column(idx).nullable()); | |
| required_col_indices.push_back(idx); | |
| } | |
| } | |
| for (size_t i = 0; i < agg_infos.size(); ++i) { | |
| output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64, | |
| false); | |
| if (agg_infos[i].input_col_idx >= 0) { | |
| required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx)); | |
| } | |
| } | |
| base_scan->set_required_columns(required_col_indices, output_schema); | |
| executor::Schema output_schema; | |
| std::vector<size_t> required_col_indices; | |
| executor::Schema reduced_input_schema; // Schema of columns scanned from table | |
| for (const auto& gb : stmt.group_by()) { | |
| const auto& gb_name = gb->to_string(); | |
| size_t idx = current_root->output_schema().find_column(gb_name); | |
| if (idx != static_cast<size_t>(-1)) { | |
| const auto& col = current_root->output_schema().get_column(idx); | |
| reduced_input_schema.add_column(col.name(), col.type(), col.nullable()); | |
| output_schema.add_column(current_root->output_schema().get_column(idx).name(), | |
| current_root->output_schema().get_column(idx).type(), | |
| current_root->output_schema().get_column(idx).nullable()); | |
| required_col_indices.push_back(idx); | |
| } | |
| } | |
| for (size_t i = 0; i < agg_infos.size(); ++i) { | |
| output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64, | |
| false); | |
| if (agg_infos[i].input_col_idx >= 0) { | |
| size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx); | |
| const auto& col = current_root->output_schema().get_column(input_idx); | |
| reduced_input_schema.add_column(col.name(), col.type(), col.nullable()); | |
| required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx)); | |
| } | |
| } | |
| base_scan->set_required_columns(required_col_indices, reduced_input_schema); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/executor/query_executor.cpp` around lines 1724 - 1744, The code builds
output_schema (GROUP BY keys + aggregate result types) and passes it to
base_scan->set_required_columns, causing the scan to initialize output columns
with aggregate result types and crash when storage expects the original input
types; instead build a reduced_schema containing the actual input column types
for every index in required_col_indices (use
current_root->output_schema().get_column(idx).type() for group-by key indices
and, for agg_infos entries with input_col_idx >=0, use the table/input column
type rather than the aggregate result type) and pass that reduced_schema to
base_scan->set_required_columns; update references around output_schema,
required_col_indices, agg_infos, and the call base_scan->set_required_columns to
use this corrected schema so ColumnarTable::read_batch sees matching types.
| for (size_t idx = 0; idx < col_indices.size(); ++idx) { | ||
| size_t col_idx = col_indices[idx]; | ||
| const std::string base = name_ + ".col" + std::to_string(col_idx); | ||
| std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary); | ||
| std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary); | ||
| if (!n_in.is_open() || !d_in.is_open()) return false; | ||
|
|
||
| auto& target_col = out_batch.get_column(idx); | ||
| const auto type = schema_.get_column(col_idx).type(); | ||
|
|
||
| if (type == common::ValueType::TYPE_INT64) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<int64_t> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value::make_int64(data[r])); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 || | ||
| type == common::ValueType::TYPE_INT8) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<int64_t> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else if (type == common::ValueType::TYPE_INT32) { | ||
| num_vec.append(common::Value(static_cast<int32_t>(data[r]))); | ||
| } else if (type == common::ValueType::TYPE_INT16) { | ||
| num_vec.append(common::Value(static_cast<int16_t>(data[r]))); | ||
| } else { | ||
| num_vec.append(common::Value(static_cast<int8_t>(data[r]))); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_FLOAT64) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<double> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value::make_float64(data[r])); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_FLOAT32) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<float>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<double> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value(static_cast<float>(data[r]))); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_DECIMAL) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg); | ||
| std::vector<double> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value::make_float64(data[r])); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_BOOL) { | ||
| auto& num_vec = dynamic_cast<executor::NumericVector<bool>&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| d_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> data(actual_rows); | ||
| d_in.read(reinterpret_cast<char*>(data.data()), actual_rows); | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| if (nulls[r] != 0U) { | ||
| num_vec.append(common::Value::make_null()); | ||
| } else { | ||
| num_vec.append(common::Value(data[r] != 0)); | ||
| } | ||
| } | ||
| } else if (type == common::ValueType::TYPE_TEXT || | ||
| type == common::ValueType::TYPE_VARCHAR || | ||
| type == common::ValueType::TYPE_CHAR) { | ||
| auto& str_vec = dynamic_cast<executor::StringVector&>(target_col); | ||
|
|
||
| n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg); | ||
| std::vector<uint8_t> nulls(actual_rows); | ||
| n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows); | ||
|
|
||
| if (start_row > 0) { | ||
| for (uint32_t r = 0; r < start_row; ++r) { | ||
| uint32_t len = 0; | ||
| if (!d_in.read(reinterpret_cast<char*>(&len), 4)) break; | ||
| if (len > 0) { | ||
| d_in.seekg(static_cast<std::streamoff>(len), std::ios::cur); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for (uint32_t r = 0; r < actual_rows; ++r) { | ||
| uint32_t len = 0; | ||
| d_in.read(reinterpret_cast<char*>(&len), 4); | ||
| std::string s(len, '\0'); | ||
| d_in.read(s.data(), len); | ||
| if (nulls[r] != 0U) { | ||
| str_vec.append(common::Value::make_null()); | ||
| } else { | ||
| str_vec.append(common::Value::make_text(s)); | ||
| } | ||
| } | ||
| } else { | ||
| throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " + | ||
| std::to_string(static_cast<int>(type))); | ||
| } | ||
| } | ||
| out_batch.set_row_count(actual_rows); | ||
| return true; | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Consider extracting shared deserialization logic into a helper method.
The type-dispatch deserialization logic (lines 320-466) is nearly identical to the original read_batch implementation (lines 141-292). This ~150-line duplication creates maintenance burden: any type support changes or bug fixes must be applied in both places, risking divergence.
♻️ Suggested refactoring approach
Extract a private helper method:
private:
// Helper: deserialize a single column from disk into a vector
bool deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
executor::ColumnVector& target_col);Then both read_batch overloads can call this helper in their loops, eliminating duplication.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/storage/columnar_table.cpp` around lines 310 - 470, The duplicated
per-column deserialization logic in ColumnarTable::read_batch should be
extracted into a private helper (suggested signature: bool
deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
executor::ColumnVector& target_col)) that encapsulates opening nulls/data files
(using storage_manager_ and name_), seeking, reading nulls and data, and
appending values for all supported common::ValueType cases; update both
read_batch overloads to call deserialize_column for each col_idx and propagate
its bool result (leave out_batch.set_row_count/return handling in the callers).
Ensure the helper references schema_.get_column(col_idx).type() and reuses the
same deserialization branches (INT64, INT32/16/8, FLOAT64/FLOAT32/DECIMAL, BOOL,
TEXT) so all file IO and value conversion is centralized.
Summary
Benchmark Results
Tests
Summary by CodeRabbit
Release Notes